{
  "metadata" : {
    "name" : "Saving Twitter Stream as Json",
    "user_save_timestamp" : "1970-01-01T01:00:00.000Z",
    "auto_save_timestamp" : "1970-01-01T01:00:00.000Z",
    "language_info" : {
      "name" : "scala",
      "file_extension" : "scala",
      "codemirror_mode" : "text/x-scala"
    },
    "trusted" : true,
    "customLocalRepo" : null,
    "customRepos" : null,
    "customDeps" : [ "org.apache.spark %% spark-streaming-twitter % _", "com.google.code.gson % gson % 2.3", "- org.apache.spark % spark-core_2.10 % _", "- org.apache.hadoop % _ % _" ],
    "customImports" : null,
    "customArgs" : null,
    "customSparkConf" : null
  },
  "cells" : [ {
    "metadata" : {
      "id" : "2463F2A514D249878A11BA48C0EEB9A8"
    },
    "cell_type" : "markdown",
    "source" : "#Twitter example"
  }, {
    "metadata" : {
      "id" : "405930C33B8F4E8B861560EFD4D51780"
    },
    "cell_type" : "markdown",
    "source" : "## Set up"
  }, {
    "metadata" : {
      "id" : "F79A2C46267F4643888294AA7684EC60"
    },
    "cell_type" : "markdown",
    "source" : "### Install the twitter credentials "
  }, {
    "metadata" : {
      "id" : "CCD907D39E964C4A9BA31099589476B7"
    },
    "cell_type" : "markdown",
    "source" : "**Note:** we are using the `env` variables here. For this, adapt the following and execute before launching the server\n```\n  export TWITTER_CONSUMER_KEY=...\n  export TWITTER_CONSUMER_SECRET=\"...\n  export TWITTER_ACCESS_TOKEN=...\n  export TWITTER_ACCESS_TOKEN_SECRET=...\n```"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "B7D20B895CBD4B4789A82D526B33F37F"
    },
    "cell_type" : "code",
    "source" : "def $(s:String) = sys.env(s)\nSystem.setProperty(\"twitter4j.oauth.consumerKey\", $(\"TWITTER_CONSUMER_KEY\"))\nSystem.setProperty(\"twitter4j.oauth.consumerSecret\", $(\"TWITTER_CONSUMER_SECRET\"))\nSystem.setProperty(\"twitter4j.oauth.accessToken\", $(\"TWITTER_ACCESS_TOKEN\"))\nSystem.setProperty(\"twitter4j.oauth.accessTokenSecret\", $(\"TWITTER_ACCESS_TOKEN_SECRET\"))\n\"twitter settings done!\"",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "E260606E36354EA4918A3802CAB1E8AC"
    },
    "cell_type" : "markdown",
    "source" : "## Spark streaming"
  }, {
    "metadata" : {
      "id" : "44F892B0C30741939AF5EEA5F0097228"
    },
    "cell_type" : "markdown",
    "source" : "### Create context with batch of 10 minutes duration"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "AA08ABE03FED4A9983864BCE76F6D7B9"
    },
    "cell_type" : "code",
    "source" : "import org.apache.spark.streaming.{Minutes, StreamingContext}\nimport org.apache.spark.SparkContext._\nimport org.apache.spark.streaming.twitter._\n\nval ssc = new StreamingContext(sparkContext, Minutes(10))",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "CBAABDF3125940A0A0B4F01DBE666808"
    },
    "cell_type" : "markdown",
    "source" : "### Listen twitter stream "
  }, {
    "metadata" : {
      "id" : "D4F1199F7C0C45F98E6294A434AEE245"
    },
    "cell_type" : "markdown",
    "source" : "#### We're going to **filter** the tweets to only those containing the following words."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "64AE068F97784B0A9C1E116B2923D301"
    },
    "cell_type" : "code",
    "source" : "val filters = Array(\"spark\", \"scala\", \"music\", \"machinelearning\", \"sparknotebook\", \"apachespark\", \"docker\",\n                    \"apachemesos\")",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "8B8AC6675D2343D08D786056591A3634"
    },
    "cell_type" : "markdown",
    "source" : "#### Create the twitter listeners"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "B4D50A791F79463483D6E434D671AF43"
    },
    "cell_type" : "code",
    "source" : "val twitterStream = TwitterUtils.createStream(ssc, None, filters)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "C8F149D79153452F835B72436BA6707D"
    },
    "cell_type" : "markdown",
    "source" : "### Convert back to Json and save"
  }, {
    "metadata" : {
      "id" : "42C77B0315714DE19CEBF8DDF7340D84"
    },
    "cell_type" : "markdown",
    "source" : "Set the number of files we want per batch"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "B3862EC0E236492C9428BD57FDC861FB"
    },
    "cell_type" : "code",
    "source" : "val partitionsEachInterval = 10 // 10 files per time max",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "0097E19A7DF741E28613D81E35DFCE3A"
    },
    "cell_type" : "markdown",
    "source" : "#### The following will convert each RDD, each Partition elements to Json (using Gson) and save it in a local (can be changed) directory."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "0D7A4D84749E442288AFE1A4C689EB32"
    },
    "cell_type" : "code",
    "source" : "twitterStream\n  .foreachRDD((rdd, time) => {\n    val count = rdd.count()\n    if (count > 0) {\n      val outputRDD = rdd.repartition(partitionsEachInterval)\n      outputRDD.mapPartitions{ it =>\n        val gson = new com.google.gson.Gson()\n        it.map(gson.toJson(_))\n      }.saveAsTextFile(\"/tmp/twitter-stream-json/tweets_\" + time.milliseconds.toString)\n    }\n  })",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "DE90E504E16D4E0F830C49316F29772C"
    },
    "cell_type" : "markdown",
    "source" : "###  Start listening twitter"
  }, {
    "metadata" : {
      "id" : "EC805FE1DB1A486F842D91A2AF0F24C1"
    },
    "cell_type" : "markdown",
    "source" : "This will listen the twitter stream, and the computation above will update the `resuilt` every `2s` using the last `60s` of values."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "A70A05775E8649F689BEB7F461E0CAC8"
    },
    "cell_type" : "code",
    "source" : "ssc.start()",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "8A20C458AA9648938C3344CBB7B938E7"
    },
    "cell_type" : "markdown",
    "source" : "### Stop listening twitter "
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "26B15D8A5E56470C8F43DC65E9AE2C45"
    },
    "cell_type" : "code",
    "source" : "// commented to all 'run all' :-D\nssc.stop(false)",
    "outputs" : [ ]
  } ],
  "nbformat" : 4
}